package com.colaiven.hbastest.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

//@Component
public class HBaseUtil {
    // 行键前缀固定长度，不足的用0补齐（后补）
    private static final int ROW_KEY_PRE_LENGTH = 40;
    // 行键参数固定长度，不足用0补齐（前补）
    private static final int ROW_KEY_OPTIONS_LENGTH = 100;
    // 行键前缀固定长度，不足的用0补齐（前补）
    private static final int ROW_KEY_SUF_LENGTH = 40;
    private static final Logger log = LoggerFactory.getLogger(HBaseUtil.class);

    /*@Value("${hbase.zookeeper.quorum}")
    private static String zookeeperQuorum;
    @Value("${hbase.zookeeper.property.clientPort}")
    private static String clientPort;
    @Value("${zookeeper.znode.parent}")
    private static String znodeParent;*/

    static Configuration conf;
    static Connection conn = null;
    static Admin admin = null;

    static {
        log.info("创建HBase连接...");
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "umaster,uslave1,uslave2");
        //conf.set("hbase.zookeeper.property.clientPort", "2181");
        try {
            conn = ConnectionFactory.createConnection(conf);
            admin = conn.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     *  创建表
     * @param tableName 表名
     * @param startKey  预分区rowKey区间开始
     * @param endKey    预分区rowKey区间结束
     * @param numRegions    分区数
     * @param columnFamilies    列族名
     */
    public static void createTable(String tableName,String startKey, String endKey, int numRegions, String ...columnFamilies) throws IOException {
        TableName name = TableName.valueOf(tableName);
        if (!admin.isTableAvailable(name)) {
            //表描述器构造器
            TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(name);
            List<ColumnFamilyDescriptor> columnFamilyDescriptorList = new ArrayList<>();
            for (String columnFamily : columnFamilies) {
                //列族描述起构造器
                ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily));
                //获得列描述
                ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptorBuilder.build();
                columnFamilyDescriptorList.add(columnFamilyDescriptor);
            }
            // 设置列簇
            tableDescriptorBuilder.setColumnFamilies(columnFamilyDescriptorList);
            //获得表描述器
            TableDescriptor tableDescriptor = tableDescriptorBuilder.build();
            //创建表
            if(startKey == null || endKey == null || numRegions == 0){
                admin.createTable(tableDescriptor);
            }else{
                admin.createTable(tableDescriptor,Bytes.toBytes(startKey),Bytes.toBytes(endKey),numRegions);
            }
        }
    }

    /**
     * 禁用表
     * @param tableName
     * @throws IOException
     */
    public static void disableTable(String tableName) throws IOException {
        TableName name = TableName.valueOf(tableName);
        if (!admin.isTableDisabled(name)) {
            admin.disableTable(name);
        }
    }

    /**
     * 清空表
     * @param tableName 表名
     */
    public static void truncate(String tableName) throws IOException {
        TableName name = TableName.valueOf(tableName);
        disableTable(tableName);
        admin.truncateTable(name,true);

    }

    /**
     * 删除表
     * @param tableName 表名
     */
    public static void deleteTable(String tableName) throws IOException {
        TableName name = TableName.valueOf(tableName);
        if (admin.isTableDisabled(name)) {
            admin.deleteTable(name);
        }
    }

    /**
     * 获取数据表列表
     */
    public static List<TableDescriptor> listTables() throws IOException {
        return admin.listTableDescriptors();
    }


    /**
     * 获取行键
     * @param prefixString
     * @param suffixString
     * @param options
     */
    public static String getRowKey(String prefixString,String suffixString,Object ...options) {
        if (prefixString.length() >ROW_KEY_PRE_LENGTH  || suffixString.length() > ROW_KEY_SUF_LENGTH) {
            return null;
        }
        StringBuilder preStringBuilder = new StringBuilder();
        preStringBuilder.append(prefixString);

        for (int i = 0; i < (ROW_KEY_PRE_LENGTH - preStringBuilder.length()); i++) {
            preStringBuilder.append("0");
        }

        StringBuilder sufStringBuilder = new StringBuilder();
        sufStringBuilder.append(suffixString);

        for (int i = 0; i < (ROW_KEY_SUF_LENGTH - sufStringBuilder.length()); i++) {
            sufStringBuilder.append("0");
        }

        StringBuilder optBuilder = new StringBuilder();

        for (Object option : options) {
            optBuilder.append(option);
        }

        if (optBuilder.length() > ROW_KEY_OPTIONS_LENGTH) {
            return null;
        }

        StringBuilder optStringBuilder = new StringBuilder();
        for (int i = 0; i < (ROW_KEY_OPTIONS_LENGTH - optBuilder.length()); i++) {
            optStringBuilder.append("0");
        }

        optStringBuilder.append(optBuilder);

        return preStringBuilder.append("|").append(optStringBuilder).append("|").append(sufStringBuilder).toString();
    }


    /**
     * 插入一条记录
     * @param tableName     表名
     * @param rowKey        rowKey
     * @param columnFamily  列族
     * @param column        列名
     * @param value         值
     */
    public static void insertOne(String tableName,String rowKey,String columnFamily,String column,String value) throws IOException {
        Put put = new Put(Bytes.toBytes(rowKey));
        //下面三个分别为，列族，列名，列值
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        //得到 table
        Table table = conn.getTable(TableName.valueOf(tableName));
        //执行插入
        table.put(put);
    }

    /**
     * 插入键值集合
     * @param tableName     表名
     * @param columnFamily  列族
     * @param mapList       键值集合，rooKey用‘rowKey’作为map的key存入
     */
    public static void insertAll(String tableName, String columnFamily, List<Map<String,String>> mapList) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        List<Put> puts = new ArrayList<>();
        for (Map<String, String> map : mapList) {
            Put put = new Put(Bytes.toBytes(map.get("rowKey")));
            Set<String> keySet = map.keySet();
            for (String key : keySet) {
                if("rowKey".equals(key)){
                    continue;
                }
                put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(key), Bytes.toBytes(map.get(key)));
            }

            puts.add(put);
        }
        table.put(puts);
    }

    /**
     * 单行多列多值-单列簇
     * @param tableName     表名
     * @param rowKey        rowKey
     * @param columnFamily  列族
     * @param map           键值对
     */
    public static void insertRow(String tableName, String rowKey,String columnFamily, Map<String,String> map) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Set<String> keySet = map.keySet();
        Put put = new Put(Bytes.toBytes(rowKey));
        for (String key : keySet) {
            put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(key), Bytes.toBytes(map.get(key)));
        }
        table.put(put);
        table.close();
    }
    /**
     * 查询一条记录
     * @param tableName 表名
     * @param rowKey    rowKey
     */
    public static Result get(String tableName, String rowKey) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        return table.get(get);
    }
    /**
     * 查询一条记录
     * @param tableName 表名
     * @param rowKey    rowKey
     * @param columnFamily    列族
     */
    public static Result get(String tableName, String rowKey, String columnFamily) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        get.addFamily(Bytes.toBytes(columnFamily));
        return table.get(get);
    }
    /**
     * 查询一条记录
     * @param tableName 表名
     * @param rowKey    rowKey
     * @param columnFamily    列族
     * @param column    列
     */
    public static Result get(String tableName, String rowKey, String columnFamily, String column) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
        return table.get(get);
    }

    /**
     * 全表扫描
     * @param tableName 表名
     */
    @Deprecated
    public static ResultScanner scan(String tableName) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        return table.getScanner(scan);
    }

    /**
     * 全表扫描-列簇
     * @param tableName     表名
     * @param columnFamily  列族
     */
    @Deprecated
    public static ResultScanner scan(String tableName,String columnFamily) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes(columnFamily));
        return table.getScanner(scan);
    }

    /**
     * 全表扫描-列
     * @param tableName     表名
     * @param columnFamily  列族
     * @param column        列
     */
    @Deprecated
    public static ResultScanner scan(String tableName,String columnFamily,String column) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        scan.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column));
        return table.getScanner(scan);
    }

    /**
     *  获取指定rowKey区间的数据
     * @param tableName     表名
     * @param rowKeyStart   开始行
     * @param rowKeyEnd     结束行
     */
    public static ResultScanner scanByRowKey(String tableName,String rowKeyStart,String rowKeyEnd) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        scan.withStartRow(Bytes.toBytes(rowKeyStart));
        scan.withStopRow(Bytes.toBytes(rowKeyEnd));
        return table.getScanner(scan);
    }

    /**
     * 全表扫描-过滤器
     * @param tableName
     * @param filter
     */
    public static ResultScanner scan(String tableName, Filter filter) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        scan.setFilter(filter);
        return table.getScanner(scan);
    }

    /**
     * 分页全表扫描-过滤器
     * @param tableName
     * @param filter
     */
    public static ResultScanner scan(String tableName, Filter filter,String startRowKey) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        scan.setFilter(filter);
        scan.withStartRow(Bytes.toBytes(startRowKey));
        return table.getScanner(scan);
    }

    /**
     * 获取分页过滤器
     * @param size
     */
    public static Filter pageFilter(long size) {
        return new PageFilter(size);
    }
    /**
     * 更新数据
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @param column
     * @param value
     * @throws IOException
     */
    public static void update(String tableName,String rowKey,String columnFamily,String column,String value) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        table.put(put);
        // table.close();
    }

    /**
     * 删除单行单列
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @param column
     */
    public static void delete(String tableName,String rowKey,String columnFamily,String column) throws IOException{
        Table table = conn.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
        table.delete(delete);
    }

    /**
     * 删除单行多列
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @param columnList
     * @throws IOException
     */
    public static void delete(String tableName,String rowKey,String columnFamily,String ...columnList) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        for (String column : columnList) {
            delete.addColumns(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
        }
        table.delete(delete);
    }

    /**
     * 删除单行单列簇
     * @param tableName
     * @param rowKey
     * @param columnFamily
     */
    public static void delete(String tableName,String rowKey,String columnFamily) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        delete.addFamily(Bytes.toBytes(columnFamily));
        table.delete(delete);
    }

    /**
     * 删除单行
     * @param tableName
     * @param rowKey
     */
    public static void delete(String tableName,String rowKey) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        table.delete(delete);
    }

    /**
     * 删除多行
     * @param tableName
     * @param rowKeyList
     */
    public static void delete(String tableName,String ...rowKeyList) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        ArrayList<Delete> deleteList = new ArrayList<>();
        for (String rowKey : rowKeyList) {
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            deleteList.add(delete);
        }
        table.delete(deleteList);
    }

    /**
     * singleColumnValueFilter
     * @param columnFamily
     * @param column
     * @param compareOperator
     * @param value
     * @return
     */
    public static Filter singleColumnValueFilter(String columnFamily, String column,CompareOperator compareOperator, String value) {
        return new SingleColumnValueFilter(Bytes.toBytes(columnFamily), Bytes.toBytes(column), compareOperator, Bytes.toBytes(value));
    }

    /**
     * rowFilter
     * @param compareOperator
     * @param rowComparator
     * @return
     */
    public static Filter rowFilter(CompareOperator compareOperator,ByteArrayComparable rowComparator) {
        return new RowFilter(compareOperator, rowComparator);
    }

    /**
     * columnPrefixFilter
     * @param prefix
     * @return
     */
    public static Filter columnPrefixFilter(String prefix) {
        return new ColumnPrefixFilter(Bytes.toBytes(prefix));
    }

    /**
     * 过滤器集合
     * @param filterList
     * @return
     */
    public static FilterList filterListPassAll(Filter ...filterList) {
        FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        for (Filter filter : filterList) {
            list.addFilter(filter);
        }
        return list;
    }
    /**
     * 过滤器集合
     * @param
     * @return
     */
    public static FilterList filterListPassOne(Filter ...filterList) {
        FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ONE);
        for (Filter filter : filterList) {
            list.addFilter(filter);
        }
        return list;
    }


    /**
     * 关闭连接
     */
    public static void close(){
        try {
            conn.close();
        } catch (IOException e) {
            conn = null;
        }finally {
            conn = null;
        }
    }
}
